The MongoDB operations log, or oplog, is a powerful yet somewhat mystifying part of the MongoDB core.
The oplog is the basis for replication and understanding its nuances in your production environment is very
important for success over the long run. For more information I recommend reading the
official documentation
I have been tinkering with “tailing” the oplog for some time and while reading various blogs, stack
overflow questions, and talking with other MongoDB users I decided to write up some of my findings.
So what do I mean by tailing the oplog? A tailable cursor in MongoDB is a query that has had the tailable
option added to it. This causes the cursor to continue “listening” for new data to be returned. This is of
interest on the oplog because all operations on the database can be seen here. Thus, we can use the oplog
to replicate action to other databases, set up filter alerts, or do anything else we can dream up. In this
article I am going to show how to connect to the oplog using java and then send messages based on what is
going on using RabbitMQ. Broadcasting messages will allow us to implement information
consumers completely independently of the code base we will be using to produce the messages.
The code in these examples should be considered example code and not production quality.
All code used in the examples may be found on my github site
There are plenty of excellent tutorials out there on how to set up tailable cursors and tailing the oplog
and I do not claim this will be any better than others. One thing I have found missing from many of them is
how to connect into a sharded cluster and auto detect the replica sets inside of it and then begin the
tailing automagically.
So, with that in mind, lets take a look at what will be discussed in this writeup. Below you will find the
system flow we will be creating. We will be using a sharded replicated MongoDB instance. We will then use a
multi-threaded java process to connect into each shard's oplog. Once we open the tailable cursor to each oplog
we will then broadcast each operation to RabbitMQ.
By moving the consumers of the MongoDB operations from the tailing program itself we can separate our concerns. We can attach
any sort of consumer to the RabbitMQ server we like. Infact, using fanout techniques we can attach as many RabbitMQ consumers as
as we like. Perhaps we want to replicate the MongoDB data to MySQL or set up filters to watch for specific information flowing
through the system. In the example code we will
be using RabbitMQ routing keys to divert different types of MongoDB operations to different queues. The examples
are designed to all run in a local environment and when kept to a small scale do run on my laptop.
To initiate the sharded replicated MongoDB environment I am using a script written by Andrew Erlichson for MongoDB's
online training courses (which I highly recommend). A windows version may also be found here.
RabbitMQ is not the focus of this writeup so I will not be going into detail on how to set it up. For more
information I recommend reading their documentation. At the time
of writing I am working with RabbitMQ version 3.3.5 for mac. I will say that getting RabbitMQ running on Windows is
possible but not a lot of fun.
On the java front I will be using java 7 and primarily running on mac. I have tested the tailing code on Windows 7
and RHEL. There are quite a few topics being covered, so for this exercise I will not be using MongoDB authentication. While
this is ok for examples, it is probably not good for your production environment.
We will begin by obtaining a connection into our MongoS. The connection code is located in our main class
ShardedReplicaTailer
Properties mongoConnectionProperties = loadProperties();
hostMongoS = new MongoClient(mongoConnectionProperties.getProperty("mongosHostInfo"));
The properties file that is being loaded currently points to localhost. Once we have a connection to a MongoS we can
use the
ShardSetFinder to obtain the shard information:
DBCursor find = mongoS.getDB("admin").getSisterDB("config").getCollection("shards").find();
To put this line of code into context, the overall software is going to obtain a map of the shards and return it back to the calling code.
Here is the code block that is building the map of shard names to mongo clients.
public Map<String, MongoClient> findShardSets(MongoClient mongoS) {
DBCursor find = mongoS.getDB("admin").getSisterDB("config").getCollection("shards").find();
Map<String, MongoClient> shardSets = new HashMap<String, MongoClient>();
while (find.hasNext()) {
DBObject next = find.next();
String key = (String) next.get("_id");
shardSets.put(key, getMongoClient(buildServerAddressList(next)));
}
find.close();
return shardSets;
}
Example output when running a local replicated sharded environment:
Adding { "_id" : "s0" , "host" : "s0/localhost:37017,localhost:37018,localhost:37019"}
Adding { "_id" : "s1" , "host" : "s1/localhost:47017,localhost:47018,localhost:47019"}
Adding { "_id" : "s2" , "host" : “s2/localhost:57017,localhost:57018,localhost:57019"}
Here we can see that we have added the shard names (s0 - s2) and their associated server lists (replica sets).
From here we need to create connections into the various replica sets to gain access to the oplog. In this
case we will spawn a thread for each shard so we can tail each shard set independently. One other very
important thing to keep in mind is the time stamp of the operations. Unless you wish to replay the entire oplog
everytime the program starts you must persist the timestamps of the latest operation per shard. This is why I
used a map to hold the connection information. Now each thread may keep track of latest time stamp for its
specific shard and use the shard name (the map key) for this.
Code excerpt from: OplogTail
...
DBCollection fromCollection = client.getDB("local").getCollection("oplog.rs");
DBObject timeQuery = getTimeQuery();
DBCursor opCursor = fromCollection.find(timeQuery)
.sort(new BasicDBObject("$natural", 1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA)
.addOption(Bytes.QUERYOPTION_NOTIMEOUT);
...
private DBObject getTimeQuery() {
return lastTimeStamp == null ? new BasicDBObject() : new BasicDBObject("ts",
new BasicDBObject("$gt",lastTimeStamp));
}
...
Ok, lets break down what went on here.
Bytes.QUERYOPTION_TAILABLE
Bytes.QUERYOPTION_AWAITDATA
Bytes.QUERYOPTION_NOTIMEOUT
At this point we are ready to start tailing. The first thing we will do is use an injection strategy to create our RabbitMQ tailing class. I will not
go over this class now, but you may find the code here: TailTypeInjector. This injection stretegy will allow us to chain tailing operations together at a later date.
package org.mongo.tail;
import com.mongodb.DBObject;
public interface TailType {
public void tailOp(DBObject op);
public void close();
}
Next we difine the following abstract class:
AbstractGenericType
package org.mongo.tail.types;
import org.mongo.tail.TailType;
import com.mongodb.DBObject;
public abstract class AbstractGenericType implements TailType {
@Override
public void tailOp(DBObject op) {
switch ((String) op.get("op")) {
case "u":
if ("repl.time".equals((String) op.get("ns"))) {}
else handleUpdates(op);
break;
case "i": handleInserts(op);
break;
case "d": handleDeletes(op);
break;
default: handleOtherOps(op);
break;
}
}
protected void handleOtherOps(DBObject op) {
System.out.println("Non-handled operation: " + op);
}
protected abstract void handleDeletes(DBObject op);
protected abstract void handleInserts(DBObject op);
protected abstract void handleUpdates(DBObject op);
public void close() {}
}
package org.mongo.tail.types;
import java.io.IOException;
import com.mongodb.DBObject;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitProducerType extends AbstractGenericType {
private static final String EXCHANGE_NAME = "mongo-tail";
private Connection connection;
private Channel channel;
public RabbitProducerType() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
} catch (IOException e) {
e.printStackTrace();
}
}
public void publishMessage(DBObject op, String routingKey) {
try {
String message = op.toString();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void handleDeletes(DBObject op) {
publishMessage(op, "d");
}
@Override
protected void handleInserts(DBObject op) {
publishMessage(op, "i");
}
@Override
protected void handleUpdates(DBObject op) {
publishMessage(op, "u");
}
@Override
public void close() {
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Jai Hirsch
Senior Systems Architect
CARFAX
jai.hirsch@gmail.com